package com.chatplus.application.web.notification;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.chatplus.application.common.logging.SouthernQuietLogger;
import com.chatplus.application.common.logging.SouthernQuietLoggerFactory;
import com.chatplus.application.common.util.PlusJsonUtils;
import com.chatplus.application.dao.framework.PendingNotificationDao;
import com.chatplus.application.domain.entity.framework.PendingNotificationEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;

import java.lang.reflect.Method;
import java.util.Arrays;

public class RedisStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
    private static final SouthernQuietLogger LOGGER = SouthernQuietLoggerFactory.getLogger(RedisStreamListener.class);

    private final Object bean;
    private final Method method;
    private final NotificationListener listener;
    private final PendingNotificationDao pendingNotificationDao;

    public RedisStreamListener(NotificationListener listener,
                               Object bean,
                               Method method,
                               PendingNotificationDao pendingNotificationDao) {
        this.method = method;
        this.bean = bean;
        this.listener = listener;
        this.pendingNotificationDao = pendingNotificationDao;
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String messageId = message.getId().getValue();
        LOGGER.message("收到消息")
                .context("messageId", message.getId())
                .context("stream", message.getStream())
                .info();
        PendingNotificationEntity pendingNotificationEntity = pendingNotificationDao.selectOne(
                new LambdaQueryWrapper<PendingNotificationEntity>().eq(PendingNotificationEntity::getMessageId, messageId));
        try {
            ParameterizedTypeReference<?> typeReference = ParameterizedTypeReference.forType(listener.notification());
            Class<?> aClass = Class.forName(typeReference.getType().getTypeName());
            Object notification = PlusJsonUtils.parseObject(message.getValue().get("payload"), aClass);
            Object[] parameters = Arrays.stream(method.getParameters())
                    .map(parameter -> {
                        Class<?> parameterClass = parameter.getType();
                        if (parameterClass.isInstance(notification)) {
                            return notification;
                        } else if (parameterClass.isInstance(listener)) {
                            return listener;
                        } else {
                            LOGGER.message("不支持在通知监听器中使用此类型的参数")
                                    .context("parameter", parameter.getClass())
                                    .context("notification", listener.notification())
                                    .warn();
                            try {
                                return parameterClass.getDeclaredConstructor().newInstance();
                            } catch (Exception e) {
                                return null;
                            }
                        }
                    })
                    .toArray();

            method.invoke(bean, parameters);
        } catch (Exception ex) {
            LOGGER.message("通知处理器抛出异常,消息进入重试队列").context("messageId", messageId).exception(ex).error();
            // 报错写入重试队列，避免堵塞，当前队列默认ack
            handleError(pendingNotificationEntity, message, ex);
        } finally {
            if (pendingNotificationEntity != null && pendingNotificationEntity.getRetryNum() >= 3) {
                // 重试次数超过3次，删除消息，不再重试
                pendingNotificationDao.deleteById(pendingNotificationEntity.getId());
            }
        }
    }
    private void handleError(PendingNotificationEntity pendingNotificationEntity, MapRecord<String, String, String> message, Exception ex) {
        boolean update = true;
        if (pendingNotificationEntity == null) {
            pendingNotificationEntity = new PendingNotificationEntity();
            pendingNotificationEntity.setRetryNum(0);
            update = false;
        } else {
            pendingNotificationEntity.setRetryNum(pendingNotificationEntity.getRetryNum() + 1);
        }
        pendingNotificationEntity.setMessageId(message.getId().getValue());
        pendingNotificationEntity.setStreamKey(message.getStream());
        pendingNotificationEntity.setBody(PlusJsonUtils.toJsonString(message.getValue()));
        pendingNotificationEntity.setCause(StringUtils.isEmpty(ex.getMessage()) ? ex.getClass().getSimpleName() : ex.getMessage());
        if (update) {
            pendingNotificationDao.updateById(pendingNotificationEntity);
        } else {
            pendingNotificationDao.insert(pendingNotificationEntity);
        }
    }
}
